/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.parquet.avro;

import static org.apache.parquet.avro.AvroTestUtil.array;
import static org.apache.parquet.avro.AvroTestUtil.field;
import static org.apache.parquet.avro.AvroTestUtil.instance;
import static org.apache.parquet.avro.AvroTestUtil.optional;
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.DirectWriterTest;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

public class TestArrayCompatibility extends DirectWriterTest {

  public static final Configuration OLD_BEHAVIOR_CONF = new Configuration();
  public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();

  @BeforeClass
  public static void setupNewBehaviorConfiguration() {
    OLD_BEHAVIOR_CONF.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, true);
    NEW_BEHAVIOR_CONF.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
  }

  @Test
  @Ignore(value = "Not yet supported")
  public void testUnannotatedListOfPrimitives() throws Exception {
    Path test =
        writeDirect("message UnannotatedListOfPrimitives {" + "  repeated int32 list_of_ints;" + "}", rc -> {
          rc.startMessage();
          rc.startField("list_of_ints", 0);

          rc.addInteger(34);
          rc.addInteger(35);
          rc.addInteger(36);

          rc.endField("list_of_ints", 0);
          rc.endMessage();
        });

    Schema expectedSchema = record("OldPrimitiveInList", field("list_of_ints", array(primitive(Schema.Type.INT))));

    GenericRecord expectedRecord = instance(expectedSchema, "list_of_ints", Arrays.asList(34, 35, 36));

    // both should behave the same way
    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
  }

  @Test
  @Ignore(value = "Not yet supported")
  public void testUnannotatedListOfGroups() throws Exception {
    Path test = writeDirect(
        "message UnannotatedListOfGroups {" + "  repeated group list_of_points {"
            + "    required float x;"
            + "    required float y;"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("list_of_points", 0);

          rc.startGroup();
          rc.startField("x", 0);
          rc.addFloat(1.0f);
          rc.endField("x", 0);
          rc.startField("y", 1);
          rc.addFloat(1.0f);
          rc.endField("y", 1);
          rc.endGroup();

          rc.startGroup();
          rc.startField("x", 0);
          rc.addFloat(2.0f);
          rc.endField("x", 0);
          rc.startField("y", 1);
          rc.addFloat(2.0f);
          rc.endField("y", 1);
          rc.endGroup();

          rc.endField("list_of_points", 0);
          rc.endMessage();
        });

    Schema point = record("?", field("x", primitive(Schema.Type.FLOAT)), field("y", primitive(Schema.Type.FLOAT)));
    Schema expectedSchema = record("OldPrimitiveInList", field("list_of_points", array(point)));

    GenericRecord expectedRecord = instance(
        expectedSchema,
        "list_of_points",
        Arrays.asList(instance(point, "x", 1.0f, "y", 1.0f), instance(point, "x", 2.0f, "y", 2.0f)));

    // both should behave the same way
    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
  }

  @Test
  public void testRepeatedPrimitiveInList() throws Exception {
    Path test = writeDirect(
        "message RepeatedPrimitiveInList {" + "  required group list_of_ints (LIST) {"
            + "    repeated int32 array;"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("list_of_ints", 0);

          rc.startGroup();
          rc.startField("array", 0);

          rc.addInteger(34);
          rc.addInteger(35);
          rc.addInteger(36);

          rc.endField("array", 0);
          rc.endGroup();

          rc.endField("list_of_ints", 0);
          rc.endMessage();
        });

    Schema expectedSchema =
        record("RepeatedPrimitiveInList", field("list_of_ints", array(Schema.create(Schema.Type.INT))));

    GenericRecord expectedRecord = instance(expectedSchema, "list_of_ints", Arrays.asList(34, 35, 36));

    // both should behave the same way
    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
  }

  @Test
  public void testMultiFieldGroupInList() throws Exception {
    // tests the missing element layer, detected by a multi-field group
    Path test = writeDirect(
        "message MultiFieldGroupInList {" + "  optional group locations (LIST) {"
            + "    repeated group element {"
            + "      required double latitude;"
            + "      required double longitude;"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));
    Schema expectedSchema = record("MultiFieldGroupInList", optionalField("locations", array(location)));

    GenericRecord expectedRecord = instance(
        expectedSchema,
        "locations",
        Arrays.asList(
            instance(location, "latitude", 0.0, "longitude", 0.0),
            instance(location, "latitude", 0.0, "longitude", 180.0)));

    // both should behave the same way
    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
  }

  @Test
  public void testSingleFieldGroupInList() throws Exception {
    // this tests the case where non-avro older data has an ambiguous list
    Path test = writeDirect(
        "message SingleFieldGroupInList {" + "  optional group single_element_groups (LIST) {"
            + "    repeated group single_element_group {"
            + "      required int64 count;"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("single_element_groups", 0);

          rc.startGroup();
          rc.startField("single_element_group", 0); // start writing array contents

          rc.startGroup();
          rc.startField("count", 0);
          rc.addLong(1234L);
          rc.endField("count", 0);
          rc.endGroup();

          rc.startGroup();
          rc.startField("count", 0);
          rc.addLong(2345L);
          rc.endField("count", 0);
          rc.endGroup();

          rc.endField("single_element_group", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("single_element_groups", 0);
          rc.endMessage();
        });

    // can't tell from storage whether this should be a list of single-field
    // records or if the single_field_group layer is synthetic.

    // old behavior - assume that the repeated type is the element type
    Schema singleElementGroupSchema = record("single_element_group", field("count", primitive(Schema.Type.LONG)));
    Schema oldSchema = record(
        "SingleFieldGroupInList", optionalField("single_element_groups", array(singleElementGroupSchema)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "single_element_groups",
        Arrays.asList(
            instance(singleElementGroupSchema, "count", 1234L),
            instance(singleElementGroupSchema, "count", 2345L)));

    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);

    // new behavior - assume that single_element_group is synthetic (in spec)
    Schema newSchema = record(
        "SingleFieldGroupInList", optionalField("single_element_groups", array(primitive(Schema.Type.LONG))));
    GenericRecord newRecord = instance(newSchema, "single_element_groups", Arrays.asList(1234L, 2345L));

    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
  }

  @Test
  public void testSingleFieldGroupInListWithSchema() throws Exception {
    // this tests the case where older data has an ambiguous structure, but the
    // correct interpretation can be determined from the avro schema

    Schema singleElementRecord = record("single_element_group", field("count", primitive(Schema.Type.LONG)));

    Schema expectedSchema =
        record("SingleFieldGroupInList", optionalField("single_element_groups", array(singleElementRecord)));

    Map<String, String> metadata = new HashMap<String, String>();
    metadata.put(AvroWriteSupport.AVRO_SCHEMA, expectedSchema.toString());

    Path test = writeDirect(
        "message SingleFieldGroupInList {" + "  optional group single_element_groups (LIST) {"
            + "    repeated group single_element_group {"
            + "      required int64 count;"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("single_element_groups", 0);

          rc.startGroup();
          rc.startField("single_element_group", 0); // start writing array contents

          rc.startGroup();
          rc.startField("count", 0);
          rc.addLong(1234L);
          rc.endField("count", 0);
          rc.endGroup();

          rc.startGroup();
          rc.startField("count", 0);
          rc.addLong(2345L);
          rc.endField("count", 0);
          rc.endGroup();

          rc.endField("single_element_group", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("single_element_groups", 0);
          rc.endMessage();
        },
        metadata);

    GenericRecord expectedRecord = instance(
        expectedSchema,
        "single_element_groups",
        Arrays.asList(
            instance(singleElementRecord, "count", 1234L), instance(singleElementRecord, "count", 2345L)));

    // both should behave the same way because the schema is present
    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
  }

  @Test
  public void testNewOptionalGroupInList() throws Exception {
    Path test = writeDirect(
        "message NewOptionalGroupInList {" + "  optional group locations (LIST) {"
            + "    repeated group list {"
            + "      optional group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("list", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a null element (element field is omitted)
          rc.startGroup(); // array level
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("list", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    // old behavior - assume that the repeated type is the element type
    Schema elementRecord = record("list", optionalField("element", location));
    Schema oldSchema = record("NewOptionalGroupInList", optionalField("locations", array(elementRecord)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "locations",
        Arrays.asList(
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 0.0)),
            instance(elementRecord),
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 180.0))));

    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);

    // new behavior - assume that single_element_group is synthetic (in spec)
    Schema newSchema = record("NewOptionalGroupInList", optionalField("locations", array(optional(location))));
    GenericRecord newRecord = instance(
        newSchema,
        "locations",
        Arrays.asList(
            instance(location, "latitude", 0.0, "longitude", 0.0),
            null,
            instance(location, "latitude", 0.0, "longitude", 180.0)));

    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
  }

  @Test
  public void testNewRequiredGroupInList() throws Exception {
    Path test = writeDirect(
        "message NewRequiredGroupInList {" + "  optional group locations (LIST) {"
            + "    repeated group list {"
            + "      required group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("list", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("list", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    // old behavior - assume that the repeated type is the element type
    Schema elementRecord = record("list", field("element", location));
    Schema oldSchema = record("NewRequiredGroupInList", optionalField("locations", array(elementRecord)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "locations",
        Arrays.asList(
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 180.0)),
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 0.0))));

    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);

    // new behavior - assume that single_element_group is synthetic (in spec)
    Schema newSchema = record("NewRequiredGroupInList", optionalField("locations", array(location)));
    GenericRecord newRecord = instance(
        newSchema,
        "locations",
        Arrays.asList(
            instance(location, "latitude", 0.0, "longitude", 180.0),
            instance(location, "latitude", 0.0, "longitude", 0.0)));

    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
  }

  @Test
  public void testAvroCompatOptionalGroupInList() throws Exception {
    Path test = writeDirect(
        "message AvroCompatOptionalGroupInList {" + "  optional group locations (LIST) {"
            + "    repeated group array {"
            + "      optional group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("array", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("array", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    // old behavior - assume that the repeated type is the element type
    Schema elementRecord = record("array", optionalField("element", location));
    Schema oldSchema = record("AvroCompatOptionalGroupInList", optionalField("locations", array(elementRecord)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "locations",
        Arrays.asList(
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 180.0)),
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 0.0))));

    // both should detect the "array" name
    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
  }

  @Test
  public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
    Path test = writeDirect(
        "message AvroCompatOptionalGroupInListWithSchema {" + "  optional group locations (LIST) {"
            + "    repeated group my_list {"
            + "      optional group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("my_list", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("array", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    Schema newSchema = record(
        "AvroCompatOptionalGroupInListWithSchema", optionalField("locations", array(optional(location))));
    GenericRecord newRecord = instance(
        newSchema,
        "locations",
        Arrays.asList(
            instance(location, "latitude", 0.0, "longitude", 180.0),
            instance(location, "latitude", 0.0, "longitude", 0.0)));

    Configuration oldConfWithSchema = new Configuration();
    AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);

    // both should use the schema structure that is provided
    assertReaderContains(new AvroParquetReader<GenericRecord>(oldConfWithSchema, test), newSchema, newRecord);

    Configuration newConfWithSchema = new Configuration(NEW_BEHAVIOR_CONF);
    AvroReadSupport.setAvroReadSchema(newConfWithSchema, newSchema);

    assertReaderContains(new AvroParquetReader<GenericRecord>(newConfWithSchema, test), newSchema, newRecord);
  }

  @Test
  public void testAvroCompatListInList() throws Exception {
    Path test = writeDirect(
        "message AvroCompatListInList {" + "  optional group listOfLists (LIST) {"
            + "    repeated group array (LIST) {"
            + "      repeated int32 array;"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("array", 0); // start writing array contents

          rc.startGroup();
          rc.startField("array", 0); // start writing inner array contents

          // write [34, 35, 36]
          rc.addInteger(34);
          rc.addInteger(35);
          rc.addInteger(36);

          rc.endField("array", 0); // finished writing inner array contents
          rc.endGroup();

          // write an empty list
          rc.startGroup();
          rc.endGroup();

          rc.startGroup();
          rc.startField("array", 0); // start writing inner array contents

          // write [32, 33, 34]
          rc.addInteger(32);
          rc.addInteger(33);
          rc.addInteger(34);

          rc.endField("array", 0); // finished writing inner array contents
          rc.endGroup();

          rc.endField("array", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema listOfLists = array(array(primitive(Schema.Type.INT)));
    Schema oldSchema = record("AvroCompatListInList", optionalField("listOfLists", listOfLists));

    GenericRecord oldRecord = instance(
        oldSchema,
        "listOfLists",
        Arrays.asList(Arrays.asList(34, 35, 36), Arrays.asList(), Arrays.asList(32, 33, 34)));

    // both should detect the "array" name
    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
  }

  @Test
  public void testThriftCompatListInList() throws Exception {
    Path test = writeDirect(
        "message ThriftCompatListInList {" + "  optional group listOfLists (LIST) {"
            + "    repeated group listOfLists_tuple (LIST) {"
            + "      repeated int32 listOfLists_tuple_tuple;"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("listOfLists_tuple", 0); // start writing array contents

          rc.startGroup();
          rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents

          // write [34, 35, 36]
          rc.addInteger(34);
          rc.addInteger(35);
          rc.addInteger(36);

          rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
          rc.endGroup();

          // write an empty list
          rc.startGroup();
          rc.endGroup();

          rc.startGroup();
          rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents

          // write [32, 33, 34]
          rc.addInteger(32);
          rc.addInteger(33);
          rc.addInteger(34);

          rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
          rc.endGroup();

          rc.endField("listOfLists_tuple", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema listOfLists = array(array(primitive(Schema.Type.INT)));
    Schema oldSchema = record("ThriftCompatListInList", optionalField("listOfLists", listOfLists));

    GenericRecord oldRecord = instance(
        oldSchema,
        "listOfLists",
        Arrays.asList(Arrays.asList(34, 35, 36), Arrays.asList(), Arrays.asList(32, 33, 34)));

    // both should detect the "_tuple" names
    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
  }

  @Test
  public void testThriftCompatRequiredGroupInList() throws Exception {
    Path test = writeDirect(
        "message ThriftCompatRequiredGroupInList {" + "  optional group locations (LIST) {"
            + "    repeated group locations_tuple {"
            + "      optional group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("locations_tuple", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("locations_tuple", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    // old behavior - assume that the repeated type is the element type
    Schema elementRecord = record("locations_tuple", optionalField("element", location));
    Schema oldSchema = record("ThriftCompatRequiredGroupInList", optionalField("locations", array(elementRecord)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "locations",
        Arrays.asList(
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 180.0)),
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 0.0))));

    // both should detect the "array" name
    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
  }

  @Test
  public void testHiveCompatOptionalGroupInList() throws Exception {
    Path test = writeDirect(
        "message HiveCompatOptionalGroupInList {" + "  optional group locations (LIST) {"
            + "    repeated group bag {"
            + "      optional group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("bag", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(0.0);
          rc.endField("longitude", 1);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("bag", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    // old behavior - assume that the repeated type is the element type
    Schema elementRecord = record("bag", optionalField("element", location));
    Schema oldSchema = record("HiveCompatOptionalGroupInList", optionalField("locations", array(elementRecord)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "locations",
        Arrays.asList(
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 180.0)),
            instance(elementRecord, "element", instance(location, "latitude", 0.0, "longitude", 0.0))));

    // both should detect the "array" name
    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);

    Schema newSchema =
        record("HiveCompatOptionalGroupInList", optionalField("locations", array(optional(location))));
    GenericRecord newRecord = instance(
        newSchema,
        "locations",
        Arrays.asList(
            instance(location, "latitude", 0.0, "longitude", 180.0),
            instance(location, "latitude", 0.0, "longitude", 0.0)));

    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
  }

  @Test
  public void testListOfSingleElementStructsWithElementField() throws Exception {
    Path test = writeDirect(
        "message ListOfSingleElementStructsWithElementField {" + "  optional group list_of_structs (LIST) {"
            + "    repeated group list {"
            + "      required group element {"
            + "        required float element;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("list_of_structs", 0);

          rc.startGroup();
          rc.startField("list", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          // the inner element field
          rc.startGroup();
          rc.startField("element", 0);
          rc.addFloat(33.0F);
          rc.endField("element", 0);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          // write a second non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);

          // the inner element field
          rc.startGroup();
          rc.startField("element", 0);
          rc.addFloat(34.0F);
          rc.endField("element", 0);
          rc.endGroup();

          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("list", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("list_of_structs", 0);
          rc.endMessage();
        });

    Schema structWithElementField = record("element", field("element", primitive(Schema.Type.FLOAT)));

    // old behavior - assume that the repeated type is the element type
    Schema elementRecord = record("list", field("element", structWithElementField));
    Schema oldSchema = record(
        "ListOfSingleElementStructsWithElementField", optionalField("list_of_structs", array(elementRecord)));
    GenericRecord oldRecord = instance(
        oldSchema,
        "list_of_structs",
        Arrays.asList(
            instance(elementRecord, "element", instance(structWithElementField, "element", 33.0F)),
            instance(elementRecord, "element", instance(structWithElementField, "element", 34.0F))));

    // check the schema
    final MessageType fileSchema;
    try (ParquetFileReader reader = ParquetFileReader.open(new Configuration(), test)) {
      fileSchema = reader.getFileMetaData().getSchema();
      Assert.assertEquals(
          "Converted schema should assume 2-layer structure",
          oldSchema,
          new AvroSchemaConverter(OLD_BEHAVIOR_CONF).convert(fileSchema));
    }

    // both should default to the 2-layer structure
    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);

    Schema newSchema = record(
        "ListOfSingleElementStructsWithElementField",
        optionalField("list_of_structs", array(structWithElementField)));
    GenericRecord newRecord = instance(
        newSchema,
        "list_of_structs",
        Arrays.asList(
            instance(structWithElementField, "element", 33.0F),
            instance(structWithElementField, "element", 34.0F)));

    // check the schema
    Assert.assertEquals(
        "Converted schema should assume 3-layer structure",
        newSchema,
        new AvroSchemaConverter(NEW_BEHAVIOR_CONF).convert(fileSchema));
    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);

    // check that this works with compatible nested schemas

    Schema structWithDoubleElementField = record("element", field("element", primitive(Schema.Type.DOUBLE)));

    Schema doubleElementRecord = record("list", field("element", structWithDoubleElementField));
    Schema oldDoubleSchema = record(
        "ListOfSingleElementStructsWithElementField",
        optionalField("list_of_structs", array(doubleElementRecord)));
    GenericRecord oldDoubleRecord = instance(
        oldDoubleSchema,
        "list_of_structs",
        Arrays.asList(
            instance(
                doubleElementRecord,
                "element",
                instance(structWithDoubleElementField, "element", 33.0)),
            instance(
                doubleElementRecord,
                "element",
                instance(structWithDoubleElementField, "element", 34.0))));
    assertReaderContains(oldBehaviorReader(test, oldDoubleSchema), oldDoubleSchema, oldDoubleRecord);

    Schema newDoubleSchema = record(
        "ListOfSingleElementStructsWithElementField",
        optionalField("list_of_structs", array(structWithDoubleElementField)));
    GenericRecord newDoubleRecord = instance(
        newDoubleSchema,
        "list_of_structs",
        Arrays.asList(
            instance(structWithDoubleElementField, "element", 33.0),
            instance(structWithDoubleElementField, "element", 34.0)));
    assertReaderContains(newBehaviorReader(test, newDoubleSchema), newDoubleSchema, newDoubleRecord);
  }

  @Test
  public void testIsElementTypeRequiredRepeatedRecord() {
    // Test `_tuple` style naming
    MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
        + "  required group list_field (LIST) {\n"
        + "    repeated group list_field_tuple (LIST) {\n"
        + "      repeated int32 int_field;\n"
        + "    }\n"
        + "  }\n"
        + "}");
    Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);

    Assert.assertTrue(AvroRecordConverter.isElementType(
        parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
        avroSchema.getFields().get(0).schema()));

    // Test `array` style naming
    parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
        + "  required group list_field (LIST) {\n"
        + "    repeated group array {\n"
        + "      required int32 a;\n"
        + "    }\n"
        + "  }\n"
        + "}");
    avroSchema = new AvroSchemaConverter().convert(parquetSchema);

    Assert.assertTrue(AvroRecordConverter.isElementType(
        parquetSchema.getType("list_field"),
        avroSchema.getFields().get(0).schema()));
  }

  @Test
  public void testIsElementTypeOptionalRepeatedRecord() {
    // Test `_tuple` style naming
    MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
        + "  optional group list_field (LIST) {\n"
        + "    repeated group list_field_tuple (LIST) {\n"
        + "      repeated int32 int_field;\n"
        + "    }\n"
        + "  }\n"
        + "}");
    Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);

    Assert.assertTrue(AvroRecordConverter.isElementType(
        parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
        avroSchema.getFields().get(0).schema()));

    // Test `array` style naming
    parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
        + "  optional group list_field (LIST) {\n"
        + "    repeated group array {\n"
        + "      required int32 a;\n"
        + "    }\n"
        + "  }\n"
        + "}");
    avroSchema = new AvroSchemaConverter().convert(parquetSchema);

    Assert.assertTrue(AvroRecordConverter.isElementType(
        parquetSchema.getType("list_field"),
        avroSchema.getFields().get(0).schema()));
  }

  @Test
  public void testIsElementTypeFailsInvalidSchema() throws Exception {
    Path test = writeDirect(
        "message MessageWithInvalidArraySchema {"
            + "  optional group locations (LIST) {"
            + "    repeated group array {"
            + "      optional group element {"
            + "        required double latitude;"
            + "        required double longitude;"
            + "      }"
            + "    }"
            + "  }"
            + "}",
        rc -> {
          rc.startMessage();
          rc.startField("locations", 0);

          rc.startGroup();
          rc.startField("array", 0); // start writing array contents

          // write a non-null element
          rc.startGroup(); // array level
          rc.startField("element", 0);
          rc.startGroup();
          rc.startField("latitude", 0);
          rc.addDouble(0.0);
          rc.endField("latitude", 0);
          rc.startField("longitude", 1);
          rc.addDouble(180.0);
          rc.endField("longitude", 1);
          rc.endGroup();
          rc.endField("element", 0);
          rc.endGroup(); // array level

          rc.endField("array", 0); // finished writing array contents
          rc.endGroup();

          rc.endField("locations", 0);
          rc.endMessage();
        });

    Schema location = record(
        "element",
        field("latitude", primitive(Schema.Type.DOUBLE)),
        field("longitude", primitive(Schema.Type.DOUBLE)));

    Schema newSchema =
        record("MessageWithInvalidArraySchema", optionalField("locations", array(optional(location))));
    GenericRecord newRecord = instance(
        newSchema,
        "locations",
        Arrays.asList(
            instance(location, "latitude", 0.0, "longitude", 180.0),
            instance(location, "latitude", 0.0, "longitude", 0.0)));

    Configuration oldConfWithSchema = new Configuration();
    AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);

    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(oldConfWithSchema, test);
    Assert.assertThrows(InvalidRecordException.class, reader::read);
  }

  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path path) throws IOException {
    return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
  }

  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path path, Schema expectedSchema)
      throws IOException {
    Configuration conf = new Configuration(OLD_BEHAVIOR_CONF);
    AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
    return new AvroParquetReader<T>(conf, path);
  }

  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(Path path) throws IOException {
    return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
  }

  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(Path path, Schema expectedSchema)
      throws IOException {
    Configuration conf = new Configuration(NEW_BEHAVIOR_CONF);
    AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
    return new AvroParquetReader<T>(conf, path);
  }

  public <T extends IndexedRecord> void assertReaderContains(
      AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords) throws IOException {
    for (T expectedRecord : expectedRecords) {
      T actualRecord = reader.read();
      Assert.assertEquals("Should match expected schema", expectedSchema, actualRecord.getSchema());
      Assert.assertEquals("Should match the expected record", expectedRecord, actualRecord);
    }
    Assert.assertNull(
        "Should only contain " + expectedRecords.length + " record" + (expectedRecords.length == 1 ? "" : "s"),
        reader.read());
  }
}
